package com.hzya.frame;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @Author：liuyang
 * @Package：com.hzya.frame
 * @Project：nifi-hzyadev-bundle
 * @name：DevJsonSplitterProcessor
 * @Date：2025/7/11 09:27
 * @Filename：DevJsonSplitterProcessor
 */

@Tags({"json", "split", "flatten", "transform", "custom"})
@CapabilityDescription("将多级JSON拆分为多个FlowFiles，使父字段名变平并添加前缀。如果JSON是单层的，它会传递原始的FlowFile。")
@ReadsAttributes({@ReadsAttribute(attribute = "storageprefix", description = "用于“table_name”属性的前缀。")})
@WritesAttributes({@WritesAttribute(attribute = "table_name", description = "为扁平化JSON数据生成的表名。")})
public class DevJsonSplitterProcessor extends AbstractProcessor {

    // 定义成功关系，用于拆分和处理后的 FlowFile
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("已成功拆分和展平的FlowFiles。").build();

    // 定义原始关系，用于未拆分的单层级 JSON FlowFile
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("未拆分的原始FlowFiles（单层JSON）。").build();

    // 定义失败关系，用于未拆分的单层级 JSON FlowFile
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("如果在处理过程中发生错误，则将原始FlowFile路由到此关系。").build();

    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private ObjectMapper objectMapper;

    // 这是你应该重写的方法
    @Override
    protected void init(final ProcessorInitializationContext context) { // 注意这里是 protected void init
        // 初始化属性列表
        final List<PropertyDescriptor> properties = new ArrayList<>();
        // 示例：如果你想让用户配置 storageprefix，可以这样定义一个属性：
        // properties.add(new PropertyDescriptor.Builder()
        //         .name("Storage Prefix")
        //         .description("The prefix to use for the 'table_name' attribute.")
        //         .required(true)
        //         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
        //         .build());
        this.properties = Collections.unmodifiableList(properties);

        // 初始化关系集合
        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);

        // 初始化 Jackson ObjectMapper
        this.objectMapper = new ObjectMapper();

        // 可以在这里获取 Logger，但通常在 onTrigger 中使用 ProcessorContext 提供的getLogger() 更好
        // this.getLogger().info("SplitAndFlattenJson processor initialized.");
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        // 获取输入的 FlowFile
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        // 获取 storageprefix 属性
        final String storagePrefix = flowFile.getAttribute("storageprefix");
        if (storagePrefix == null) {
            getLogger().warn("FlowFile｛｝没有“storageprefix”属性。转为REL_FAILURE。", flowFile);
            session.transfer(flowFile, REL_FAILURE); // 如果没有 storageprefix，则直接传递给失败
            return;
        }

        final AtomicReference<String> jsonContent = new AtomicReference<>();
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(final InputStream in) throws IOException {
                // 使用 IOUtils 读取所有字节
                byte[] bytes = IOUtils.toByteArray(in);
                jsonContent.set(new String(bytes, "UTF-8"));
            }
        });

        JsonNode rootNode;
        try {
            // 解析 JSON 字符串
            rootNode = objectMapper.readTree(jsonContent.get());
        } catch (IOException e) {
            getLogger().error("未能解析FlowFile{}的JSON:{}", flowFile, e.getMessage(), e);
            session.transfer(flowFile, REL_FAILURE); // 解析失败，传递给失败
            return;
        }

        // 检查 JSON 是否为单层级
        if (isSingleLevel(rootNode)) {
            getLogger().debug("FlowFile{}包含单层JSON。转为REL_ORIGINAL", flowFile);
            session.transfer(flowFile, REL_ORIGINAL);
            return;
        }

        // 递归处理 JSON 节点
        processNode(rootNode, "", "", flowFile, storagePrefix, session);

        // 移除原始 FlowFile
        session.remove(flowFile);
        getLogger().debug("已处理并删除原始FlowFile {}.", flowFile);
    }

    /**
     * 检查 JSON 节点是否为单层级。
     * 单层级定义为：
     * 1. 如果是对象，所有值都不是对象或数组。
     * 2. 如果是数组，所有元素都是对象，且这些对象的所有值都不是对象或数组。
     *
     * @param node 要检查的 JSON 节点
     * @return 如果是单层级 JSON，则返回 true；否则返回 false。
     */
    private boolean isSingleLevel(JsonNode node) {
        if (node.isObject()) {
            // 如果是对象，检查所有字段的值是否都是非对象或非数组
            for (JsonNode child : node) {
                if (child.isObject() || child.isArray()) {
                    return false; // 包含子对象或子数组，不是单层级
                }
            }
            return true; // 所有值都是非对象或非数组，是单层级
        } else if (node.isArray()) {
            // 如果是数组，检查所有元素是否都是单层级对象
            for (JsonNode element : node) {
                if (!element.isObject() || !isSingleLevel(element)) {
                    return false; // 包含非对象元素或非单层级对象元素，不是单层级
                }
            }
            return true; // 所有元素都是单层级对象，是单层级
        }
        return true; // 其他类型（如基本类型），也视为单层级
    }


    /**
     * 递归处理 JSON 节点，生成 FlowFile。
     *
     * @param node           当前要处理的 JSON 节点
     * @param parentName     父级字段名（用于拼接）
     * @param path           当前节点在 JSON 结构中的路径（用于生成 table_name）
     * @param parentFlowFile 原始 FlowFile
     * @param storagePrefix  storageprefix 属性值
     * @param session        NiFi 处理会话
     */
    private void processNode(JsonNode node, String parentName, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) {

        if (node.isObject()) {
            // 如果是 JSON 对象
            ObjectNode objectNode = (ObjectNode) node;

            // 检查是否为叶子对象 (所有值都不是对象或数组)
            boolean isLeafObject = true;
            Iterator<Map.Entry<String, JsonNode>> fields = objectNode.fields();
            while (fields.hasNext()) {
                Map.Entry<String, JsonNode> field = fields.next();
                if (field.getValue().isObject() || field.getValue().isArray()) {
                    isLeafObject = false;
                    break;
                }
            }

            if (isLeafObject) {
                // 如果是叶子对象，创建新的 ObjectNode 用于扁平化
                ObjectNode leafObj = objectMapper.createObjectNode();
                objectNode.fields().forEachRemaining(entry -> {
                    // 拼接字段名并去除空格
                    String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", "");
                    leafObj.set(fieldName, entry.getValue());
                });
                publishFlowFile(leafObj, path, parentFlowFile, storagePrefix, session);
            } else {
                // 非叶子对象，递归处理其子节点
                objectNode.fields().forEachRemaining(entry -> {
                    JsonNode childNode = entry.getValue();
                    String currentKey = entry.getKey().replace(" ", ""); // 去除当前键的空格
                    String newPath = path.isEmpty() ? currentKey : (path + "_" + currentKey); // 拼接路径

                    // 如果子节点是对象或数组，则递归处理
                    if (childNode.isObject() || childNode.isArray()) {
                        processNode(childNode, currentKey, newPath, parentFlowFile, storagePrefix, session);
                    }
                });
            }
        } else if (node.isArray()) {
            // 如果是 JSON 数组
            ArrayNode arrayNode = (ArrayNode) node;

            // 检查是否为叶子数组 (所有元素都是叶子对象)
            boolean isLeafArray = true;
            for (JsonNode element : arrayNode) {
                if (!element.isObject() || !isLeafObject(element)) { // isLeafObject 辅助方法判断
                    isLeafArray = false;
                    break;
                }
            }

            if (isLeafArray) {
                // 如果是叶子数组，为数组中的每个对象添加拼接的字段名称
                ArrayNode leafArray = objectMapper.createArrayNode();
                arrayNode.forEach(item -> {
                    if (item.isObject()) {
                        ObjectNode newItem = objectMapper.createObjectNode();
                        item.fields().forEachRemaining(entry -> {
                            // 拼接字段名并去除空格
                            String fieldName = parentName.isEmpty() ? entry.getKey().replace(" ", "") : (parentName + "_" + entry.getKey()).replace(" ", "");
                            newItem.set(fieldName, entry.getValue());
                        });
                        leafArray.add(newItem);
                    }
                });
                publishFlowFile(leafArray, path, parentFlowFile, storagePrefix, session);
            } else {
                // 非叶子数组，递归处理每个元素
                arrayNode.forEach(item -> {
                    processNode(item, parentName, path, parentFlowFile, storagePrefix, session);
                });
            }
        }
        // 对于基本类型节点，不进行处理，因为它们不是可以被拆分的层级
    }

    /**
     * 辅助方法：检查一个 JsonNode 是否为叶子对象。
     *
     * @param node 要检查的 JsonNode
     * @return 如果是叶子对象则返回 true，否则返回 false。
     */
    private boolean isLeafObject(JsonNode node) {
        if (!node.isObject()) {
            return false;
        }
        Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
        while (fields.hasNext()) {
            Map.Entry<String, JsonNode> field = fields.next();
            if (field.getValue().isObject() || field.getValue().isArray()) {
                return false;
            }
        }
        return true;
    }

    /**
     * 将处理后的 JSON 内容写入新的 FlowFile 并传输。
     *
     * @param contentNode    要写入 FlowFile 的 JSON 内容
     * @param path           当前节点在 JSON 结构中的路径
     * @param parentFlowFile 原始 FlowFile
     * @param storagePrefix  storageprefix 属性值
     * @param session        NiFi 处理会话
     */
    private void publishFlowFile(JsonNode contentNode, String path, FlowFile parentFlowFile, String storagePrefix, ProcessSession session) {
        FlowFile newFlowFile = session.create(parentFlowFile);
        newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
            @Override
            public void process(final OutputStream out) throws IOException {
                // 将 JSON 内容写入输出流
                objectMapper.writeValue(out, contentNode);
            }
        });

        // 设置 table_name 属性
        String tableNamePath = path.isEmpty() ? "data_details" : path;
        newFlowFile = session.putAttribute(newFlowFile, "table_name", storagePrefix + "_" + tableNamePath);

        // 传输新创建的 FlowFile 到成功关系
        session.transfer(newFlowFile, REL_SUCCESS);
        getLogger().debug("已使用表名创建新的FlowFile {}：{}", newFlowFile, storagePrefix + "_" + tableNamePath);
    }
}
